// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package wsPool

import (
	"errors"
	"gitee.com/rczweb/wsPool/util/gmap"
	"gitee.com/rczweb/wsPool/util/queue"
	"log"
	"time"
)

var (
	//最大连接池缓冲处理连接对像管道长度
	Max_client_channel_len = 10240
	//最大全局广播缓冲处理管道长度 	//最大频道广播缓冲处理管道长度 0为全局广播频道
	Max_broadcastQueue_len = 10240

	//最大接收消息缓冲处理管道长度
	Max_recvCh_len = 1024
	//最大发送消息缓冲处理管道长度
	Max_sendCh_len = 1024
)

type broadcastMessage struct {
	Channel string
	Msg     *Message
}

//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息，60秒后原ws未连接消息失效)
type oldMsg struct {
	list       *queue.PriorityQueue
	Expiration time.Time //过期时间
}

// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
	// Registered clients.
	clients    *gmap.StrAnyMap //map[string]*Client// //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
	oldClients *gmap.StrAnyMap //缓存断开的连接消息队列

	// Inbound messages from the clients.
	//可以用于广播所有连接对象 //广播指定频道的管道 频道Channel=""时广播所有
	broadcastQueue chan *broadcastMessage

	// Register requests from the clients.
	register chan *Client

	// Unregister requests from clients.
	unregister chan string
}

func newHub() *hub {
	return &hub{
		register:   make(chan *Client, Max_client_channel_len),
		unregister: make(chan string, Max_client_channel_len),

		clients:        gmap.NewStrAnyMap(true), //make(map[string]*Client),//
		oldClients:     gmap.NewStrAnyMap(true), //make(map[string]*Client),//
		broadcastQueue: make(chan *broadcastMessage, Max_broadcastQueue_len),
	}
}

func (h *hub) run() {
loop:
	for {
		select {
		case id, ok := <-h.unregister:
			if !ok {
				break loop
			}
			c := h.clients.Get(id)
			if c != nil {
				h.clients.Remove(id)
			}
			log.Println("取消注册ws连接对象：", id, "连接总数：", h.clients.Size())

		case client, ok := <-h.register:
			if !ok {
				break loop
			}
			log.Println("注册ws连接对象：", client.Id, "连接总数：", h.clients.Size())
			h.clients.Set(client.Id, client)

		case broadcastMsg, ok := <-h.broadcastQueue:
			if !ok {
				break loop
			}
			if broadcastMsg.Channel == "" {
				h.clients.Iterator(func(id string, v interface{}) bool {
					if v != nil {
						client := v.(*Client)
						client.send(broadcastMsg.Msg)
					}
					return true
				})
			} else {
				//广播指定频道的消息处理
				h.clients.Iterator(func(id string, v interface{}) bool {
					if v != nil {
						client := v.(*Client)
						if searchStrArray(client.channel, broadcastMsg.Channel) {
							client.send(broadcastMsg.Msg)
						}
					}
					return true
				})
			}

		}

	}
}
func (h *hub) ticker() {
	//定时清理清理缓存的旧的连接对像
	ticket := time.NewTimer(time.Second * 30)
	for {
		select {
		case <-ticket.C:
			if h.oldClients.Size() > 0 {
				h.oldClients.Iterator(func(k string, v interface{}) bool {
					if v != nil {
						client := v.(*Client)
						if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
							//3分钟后清理组存中的旧连接对像
							//h.clearOldClient <- client
							h.clearOldClient(client)
						}
					}
					return true
				})
			}
		}
	}

}

func (h *hub) AddClient(client *Client) error {
	timeout := time.NewTimer(time.Second * 3)
	defer timeout.Stop()
	select {
	case h.register <- client:
		return nil
	case <-timeout.C:
		return errors.New("AddClient register消息管道blocked,写入消息超时")
	}
}

func (h *hub) clearOldClient(client *Client) {
	close(client.recvCh)
	close(client.sendCh)
	h.oldClients.Remove(client.Id)
	log.Println("己断开的ws连接缓存对象：", client.Id, "旧连接总数：", h.oldClients.Size())
}

func (h *hub) RemoveClient(client *Client) error {
	//把连接对像缓存在旧对像列表中，并设置连接断开的时间，过期未连接就会清理对像
	client.CloseTime = time.Now()
	h.oldClients.Set(client.Id, client)
	timeout := time.NewTimer(time.Second * 1)
	defer timeout.Stop()
	select {
	case h.unregister <- client.Id:
		return nil
	case <-timeout.C:
		return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
	}
}

func (h *hub) Broadcast(channel string, msg *Message) error {
	timeout := time.NewTimer(time.Millisecond * 800)
	defer timeout.Stop()
	select {
	case wsSever.hub.broadcastQueue <- &broadcastMessage{Channel: channel, Msg: msg}:
		return nil
	case <-timeout.C:
		return errors.New("hub.broadcastQueue 消息管道blocked,写入消息超时,管道长度：" + string(len(wsSever.hub.broadcastQueue)))
	}
}
